"""

"""

from typing import Optional

import sqlalchemy
from pydantic.types import conint
from sqlalchemy.orm import Session
from sqlalchemy import func, or_, not_
from app.utils.custom_exc import CustomException
from app.utils.file_storage import get_storage_file
from app.utils.security import get_password_hash, verify_password
from app.utils.curd_base import CRUDBase, ModelCRUD
from app.models.system import Users, UserRole
from ..schemas import users_schema
from fastapi.encoders import jsonable_encoder


class CRUDUser(CRUDBase[Users, users_schema.UserCreate, users_schema.UserUpdate]):

    @staticmethod
    def get_userinfo(user: Users) -> dict:
        """
        获取用户信息
        """
        resp = {
            'id': user.id,
            'user_uuid': user.user_uuid,
            'nickname': user.nickname,
            'username': user.username,
            'avatar': get_storage_file(user.avatar),
            'phone': user.phone,
            'gender': user.gender,
            'country': user.country,
            'province': user.province,
            'city': user.city,
            'is_active': user.is_active,
            'is_superuser': user.is_superuser,
            'register_time': user.register_time.strftime('%Y-%m-%d %H:%M:%S') if user.register_time else user.register_time,
            'role': [{"name": ins.role.name, "id": ins.role_id} for ins in user.role]
        }
        return resp

    @staticmethod
    def get_by_username(db: Session, *, username: str) -> Optional[Users]:
        """
        username
        参数里面的* 表示 后面调用的时候 要用指定参数的方法调用
        :param db:
        :param username:
        :return:
        """
        return db.query(Users).filter(Users.username == username).first()

    def create(self, db: Session, *, obj_in) -> Users:
        try:
            # print(jsonable_encoder(obj_in, exclude_none=True))
            obj_dict = jsonable_encoder(obj_in)
            # print(obj_dict)
            pwd = obj_dict.pop('password', None)
            password1 = obj_dict.pop('password1', None)
            if password1 != pwd:
                msg = "两次密码不一致"
                raise CustomException(msg)
            obj_dict['hashed_password'] = get_password_hash(pwd)
            db_obj = Users(**obj_dict)  # type: ignore
            db.add(db_obj)
            db.commit()
            db.refresh(db_obj)
            return db_obj
        except Exception as e:
            if isinstance(e, sqlalchemy.exc.IntegrityError):
                msg = "已存在用户"
                raise CustomException(msg)
            else:
                raise Exception(e)

    def partial_update(self, db: Session, *, user: Users, obj_in: users_schema.ModifyUserInfor) -> Users:
        obj = self.get_user(db, user_id=user.id)
        obj_dict = jsonable_encoder(obj_in, exclude_none=True)

        for k, v in obj_dict.items():
            setattr(obj, k, v)

        db.commit()
        return obj

    def password_update(self, db: Session, *, user: Users, obj_in: users_schema.UpdatePassword) -> Users:
        obj = self.get_user(db, user_id=user.id)
        obj_dict = jsonable_encoder(obj_in)
        password0 = obj_dict.pop('password0', None)
        if not verify_password(password0, user.hashed_password):
            msg = "原密码错误"
            raise CustomException(msg)
        password1 = obj_dict.pop('password1', None)
        password2 = obj_dict.pop('password2', None)
        if password1 != password2:
            msg = "两次密码不一致"
            raise CustomException(msg)
        setattr(obj, 'hashed_password', get_password_hash(password1))
        # user.hashed_password = get_password_hash(password1)
        db.commit()
        return obj

    def avatar_update(self, db: Session, *, user: Users, avatar: str) -> Users:
        obj = self.get_user(db, user_id=user.id)
        setattr(obj, 'avatar', avatar)
        db.commit()
        return obj

    def authenticate(self, db: Session, *, username: str, password: str) -> Optional[Users]:
        user = self.get_by_username(db, username=username)
        if not user:
            return None
        if not verify_password(password, user.hashed_password):
            return None
        return user

    def get_user_list(self, db: Session, *, page: int = 1, page_size: conint(le=50) = 10, search_info: str = None):
        temp_page = (page - 1) * page_size
        total_q = db.query(func.count(Users.id)).filter(Users.is_delete == 0)
        if search_info:
            total_q = total_q.filter(or_(Users.user_uuid.contains(search_info), Users.nickname.contains(search_info),
                                         Users.username.contains(search_info), Users.phone.contains(search_info),))
        total = total_q.scalar()
        user_set = db.query(Users).filter()
        if search_info:
            user_set = user_set.filter(or_(Users.user_uuid.contains(search_info), Users.nickname.contains(search_info),
                                           Users.username.contains(search_info), Users.phone.contains(search_info),))
        user_list = user_set.offset(temp_page).limit(page_size).all()
        items = [self.get_userinfo(obj) for obj in user_list]
        resp_dict = dict()
        resp_dict['items'] = items
        resp_dict['total'] = total
        return resp_dict

    def get_user(self, db: Session, *, user_id: int):
        user = db.query(Users).filter(Users.id == user_id).first()
        return user

    @staticmethod
    def is_active(user: Users) -> bool:
        return user.is_active == 1


curd_user = CRUDUser(Users)


class CRUDUserRole(ModelCRUD):
    model = UserRole

    def set_user_role(self, db: Session, *, req_body: users_schema.SetUserRole):
        req_data = jsonable_encoder(req_body)
        role_list = req_data.pop('role', [])
        user_id = req_data.get('user_id')

        queryset = db.query(self.model).filter(self.model.user_id == user_id)  # type: ignore
        for role_id in role_list:
            if not queryset.filter(self.model.role_id == role_id).first():
                instance = self.model(user_id=user_id, role_id=role_id)  # type: ignore
                db.add(instance)
        del_queryset = queryset.filter(not_(self.model.role_id.in_(role_list)))
        del_queryset.delete()
        db.commit()
        return True


crud_user_role = CRUDUserRole()


